From e10158b99b0c8fd37ca2a3e208e82116eaf4e0cc Mon Sep 17 00:00:00 2001 From: "jrb44@swoop.cl.cam.ac.uk" Date: Thu, 24 Mar 2005 18:50:40 +0000 Subject: [PATCH] bitkeeper revision 1.1236.43.3 (42430c00lAWor_gGz6AYWnPE5wc0XQ) Added concurrency niceness to the bottom end of the blockstore. Signed-off-by: James Bulpin --- tools/blktap/blockstore.c | 120 ++++++++++++++++++++++++++++++++++---- tools/blktap/blockstore.h | 4 +- 2 files changed, 111 insertions(+), 13 deletions(-) diff --git a/tools/blktap/blockstore.c b/tools/blktap/blockstore.c index b01bf2117b..5de2a6885a 100644 --- a/tools/blktap/blockstore.c +++ b/tools/blktap/blockstore.c @@ -15,10 +15,11 @@ #include #include #include "blockstore.h" +#include #include "parallax-threaded.h" #define BLOCKSTORE_REMOTE -//#define BSDEBUG +#define BSDEBUG /***************************************************************************** * Debugging @@ -27,7 +28,7 @@ void DB(char *format, ...) { va_list args; - + fprintf(stderr, "[%05u] ", (int)pthread_getspecific(tid_key)); va_start(args, format); vfprintf(stderr, format, args); va_end(args); @@ -43,10 +44,6 @@ void DB(char *format, ...) #include #include -/***************************************************************************** - * * - *****************************************************************************/ - /***************************************************************************** * Network state * *****************************************************************************/ @@ -71,8 +68,30 @@ int bssock = 0; /* Protects the queue manipulation critcal regions. */ -#define ENTER_QUEUE_CR (void)0 -#define LEAVE_QUEUE_CR (void)0 +pthread_mutex_t ptmutex_queue; +#define ENTER_QUEUE_CR pthread_mutex_lock(&ptmutex_queue) +#define LEAVE_QUEUE_CR pthread_mutex_unlock(&ptmutex_queue) + +pthread_mutex_t ptmutex_recv; +#define ENTER_RECV_CR pthread_mutex_lock(&ptmutex_recv) +#define LEAVE_RECV_CR pthread_mutex_unlock(&ptmutex_recv) + +int notify = 0; +pthread_mutex_t ptmutex_notify; +pthread_cond_t ptcv_notify; +#define RECV_NOTIFY { \ + pthread_mutex_lock(&ptmutex_notify); \ + notify = 1; \ + pthread_cond_signal(&ptcv_notify); \ + pthread_mutex_unlock(&ptmutex_notify); } +#define RECV_AWAIT { \ + pthread_mutex_lock(&ptmutex_notify); \ + if (notify) \ + notify = 0; \ + else \ + pthread_cond_wait(&ptcv_notify, &ptmutex_notify); \ + pthread_mutex_unlock(&ptmutex_notify); } + /* A message queue entry. We allocate one of these for every request we send. * Asynchronous reply reception also used one of these. @@ -91,8 +110,9 @@ typedef struct bsq_t_struct { #define BSQ_STATUS_MATCHED 1 -#define ENTER_LUID_CR (void)0 -#define LEAVE_LUID_CR (void)0 +pthread_mutex_t ptmutex_luid; +#define ENTER_LUID_CR pthread_mutex_lock(&ptmutex_luid) +#define LEAVE_LUID_CR pthread_mutex_unlock(&ptmutex_luid) static u64 luid_cnt = 0x1000ULL; u64 new_luid(void) { @@ -218,6 +238,10 @@ bsq_t *queuesearch(bsq_t *qe) { return q; } +/***************************************************************************** + * Network communication * + *****************************************************************************/ + int send_message(bsq_t *qe) { int rc; @@ -331,7 +355,7 @@ bsq_t rx_qe; bsq_t *recv_any(void) { struct sockaddr_in from; int rc; - + DB("ENTER recv_any\n"); rx_qe.msghdr.msg_name = &from; @@ -361,6 +385,7 @@ bsq_t *recv_any(void) { perror("recv_any"); return NULL; } + rx_qe.length = rc; rx_qe.server = get_server_number(&from); @@ -395,8 +420,13 @@ int wait_recv(bsq_t **reqs, int numreqs) { return numreqs; } + RECV_AWAIT; + + /* rxagain: + ENTER_RECV_CR; q = recv_any(); + LEAVE_RECV_CR; if (!q) return -1; @@ -406,11 +436,42 @@ int wait_recv(bsq_t **reqs, int numreqs) { fprintf(stderr, "Unmatched RX\n"); goto rxagain; } + */ goto checkmatch; } +/* receive loop + */ +void *receive_loop(void *arg) +{ + bsq_t *q, *m; + + for(;;) { + q = recv_any(); + if (!q) { + fprintf(stderr, "recv_any error\n"); + } + else { + m = queuesearch(q); + recv_recycle_buffer(q); + if (!m) { + fprintf(stderr, "Unmatched RX\n"); + } + else { + DB("RX MATCH"); + RECV_NOTIFY; + } + } + } +} +pthread_t pthread_recv; + +/***************************************************************************** + * Reading * + *****************************************************************************/ + void *readblock_indiv(int server, u64 id) { void *block; bsq_t *qe; @@ -538,6 +599,10 @@ void *readblock(u64 id) { return block; } +/***************************************************************************** + * Writing * + *****************************************************************************/ + bsq_t *writeblock_indiv(int server, u64 id, void *block) { bsq_t *qe; @@ -663,6 +728,10 @@ int writeblock(u64 id, void *block) { return -1; } +/***************************************************************************** + * Allocation * + *****************************************************************************/ + /** * allocblock: write a new block to disk * @block: pointer to block @@ -791,6 +860,9 @@ u64 allocblock_hint(void *block, u64 hint) { #else /* /BLOCKSTORE_REMOTE */ +/***************************************************************************** + * Local storage version * + *****************************************************************************/ /** * readblock: read a block from disk @@ -923,6 +995,10 @@ u64 allocblock_hint(void *block, u64 hint) { #endif /* BLOCKSTORE_REMOTE */ +/***************************************************************************** + * Memory management * + *****************************************************************************/ + /** * newblock: get a new in-memory block set to zeros * @@ -1053,6 +1129,10 @@ void freelist_count(int print_each) printf("Total of %Ld ids on freelist.\n", total); } +/***************************************************************************** + * Initialisation * + *****************************************************************************/ + int __init_blockstore(void) { int i; @@ -1062,6 +1142,13 @@ int __init_blockstore(void) #ifdef BLOCKSTORE_REMOTE struct hostent *addr; + + pthread_mutex_init(&ptmutex_queue, NULL); + pthread_mutex_init(&ptmutex_luid, NULL); + pthread_mutex_init(&ptmutex_recv, NULL); + pthread_mutex_init(&ptmutex_notify, NULL); + pthread_cond_init(&ptcv_notify, NULL); + bsservers[0].hostname = "firebug.cl.cam.ac.uk"; bsservers[1].hostname = "planb.cl.cam.ac.uk"; bsservers[2].hostname = "simcity.cl.cam.ac.uk"; @@ -1137,6 +1224,8 @@ int __init_blockstore(void) return -1; } + pthread_create(&pthread_recv, NULL, receive_loop, NULL); + #else /* /BLOCKSTORE_REMOTE */ block_fp = open("blockstore.dat", O_RDWR | O_CREAT | O_LARGEFILE, 0644); @@ -1170,3 +1259,12 @@ int __init_blockstore(void) #endif /* BLOCKSTORE_REMOTE */ return 0; } + +void __exit_blockstore(void) +{ + pthread_mutex_destroy(&ptmutex_recv); + pthread_mutex_destroy(&ptmutex_luid); + pthread_mutex_destroy(&ptmutex_queue); + pthread_mutex_destroy(&ptmutex_notify); + pthread_cond_destroy(&ptcv_notify); +} diff --git a/tools/blktap/blockstore.h b/tools/blktap/blockstore.h index c5fdeef7bb..8415786511 100644 --- a/tools/blktap/blockstore.h +++ b/tools/blktap/blockstore.h @@ -22,7 +22,7 @@ #endif #define FREEBLOCK_SIZE (BLOCK_SIZE / sizeof(u64)) - (3 * sizeof(u64)) -#define FREEBLOCK_MAGIC 0x0fee0fee0fee0fee +#define FREEBLOCK_MAGIC 0x0fee0fee0fee0feeULL typedef struct { u64 magic; @@ -31,7 +31,7 @@ typedef struct { u64 list[FREEBLOCK_SIZE]; } freeblock_t; -#define BLOCKSTORE_MAGIC 0xaaaaaaa00aaaaaaa +#define BLOCKSTORE_MAGIC 0xaaaaaaa00aaaaaaaULL #define BLOCKSTORE_SUPER 1ULL typedef struct { -- 2.30.2